[Python] 好用的 concurrent.futures is a good way to speed up your function


Posted by jerryeml on 2021-11-10

前言

有鑑於之前介紹了許多關於多線程跟多進程的東西
Golang-Advance- GO routine
Multi-processing 和 Multi-threading 的 Differenece:


如果喜歡,請幫我按讚訂閱分享並且開啟小鈴鐺(目前還沒有XD)

快速 Recap 一下 Concurrency and Parallelism

順序執行:老師甲先幫學生A輔導,輔導完之後再取給B輔導,最後再去給C輔導。
Concurrency:老師甲先給學生A去講思路,A聽懂了自己書寫過程,這期間甲老師去給B講思路,講完思路,B自己書寫過程,這期間再去給C講思路。這樣老師就沒有空著,一直在做事情。與順序執行不同的是,順序執行,老師講完思路之後學生再寫步驟,這期間老師是空閒的。
Parallelism:直接讓三個老師甲、乙、丙,「同時」給三個學生輔導作業。

有興趣的人可以先回去複習一下XD

Python 中實現 Concurrency 與 Parallelism

根據上圖 我們發現 要實現 Concurrency 一個好的做法就是利用 Threading 或是 Async IO 的方式

剛好concurrent.futures 這個好用的 Python package 可以滿足我們的需求

在我們正式開始之前,先了解關於 Future 模式的相關知識

首先 Future 是什麼?

Future 其實是生產-消費者模型的一種擴充套件,在生產-消費者模型中,生產者不關心消費者什麼時候處理完資料,也不關心消費者處理的結果。比如我們經常寫出如下的程式碼

import multiprocessing, Queue
import os
import time
from multiprocessing import Process
from time import sleep
from random import randint

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            self.queue.put(`one product`)
            print(multiprocessing.current_process().name + str(os.getpid()) + ` produced one product, the no of queue now is: %d` %self.queue.qsize())
            sleep(randint(1, 3))


class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            d = self.queue.get(1)
            if d != None:
                print(multiprocessing.current_process().name + str(os.getpid()) + ` consumed  %s, the no of queue now is: %d` %(d,self.queue.qsize()))
                sleep(randint(1, 4))
                continue
            else:
                break

#create queue
queue = multiprocessing.Queue(40)

if __name__ == "__main__":
    print(`Excited!")
    #create processes    
    processed = []
    for i in range(3):
        processed.append(Producer(queue))
        processed.append(Consumer(queue))

    #start processes        
    for i in range(len(processed)):
        processed[i].start()

    #join processes    
    for i in range(len(processed)):
        processed[i].join()複製程式碼

這就是生產-消費者模型的一個簡單的實現,我們利用一個 multiprocessing 中的 Queue 來作為通訊渠道,我們的生產者負責往佇列中傳入資料,消費者負責從佇列中獲取資料並處理。不過就如同上面所說的一樣,在這種模式中,生產者並不關心消費者何時處理完資料,也不關心處理的結果。

concurrent.futures

Python 3.2 以後, concurrent.futures 是內建的模組,我們可以直接使用

Note: 如果你需要在 Python 2.7 中使用 concurrent.futures , 那麼請用 pip 進行安裝,pip install futures

Python 關於平行處理的模組除了 multiprocessingthreading 之外,其實還提供 1 個更為簡單易用的 concurrent.futures 可以使用。

concurrent.futures 提供了一組高階 API 給使用者操作非同步執行的任務。透過 ThreadPoolExectuor 執行 thread 層級的非同步任務,或是使用 ProcessPoolExecutor 執行 process 層級的非同步任務。兩者的 API 介面都相同,同樣繼承於 Executor
(今天會針對 Thread 做介紹)

ThreadPoolExecutor

ThreadPoolExecutor 如其名,透過 Thread 的方式建立多個 Executors ,用以執行消化多個任務(tasks)

例如以下範例,建立 1 個 ThreadPoolExecutor 以最多不超過 5 個 Threads 的方式平行執行 vision_one_has ,每個 vision_one_has 所需要的參數都是透過呼叫 submit 的方式交給 Executer 處理:

from concurrent.futures import ThreadPoolExecutor

def vision_one_has(team):
    print(team)

teams = ['Elpis', 'Genger', 'Matrix', 'Sorlax', 'Lapras']

with ThreadPoolExecutor(max_workers=5) as executor:
    for n in teams:
        executor.submit(vision_one_has, n)

上述範例執行結果如下:

Elpis
Genger
Matrix
Sorlax
Lapras

Future objects

接著談談 concurrent.futures 模組中相當重要的角色 —— Future

事實上,當呼叫 submit 後,會回傳的並不是在 Thread 執行的程式結果,而是 Future 的實例,而這個實例是一個執行結果的代理 (Proxy) ,所以我們可以透過 done , running , cancelled 等方法詢問 Future實例在 Thread 中執行的程式狀態如何,如果程式已經進入 done 的狀態,則可以呼叫 result 取得結果。Link

不過 Python 也提供更簡單的方法 —— as_completed ,幫忙檢查狀態,所以可以少寫一些程式碼。

因此前述範例可以進一步改成以下形式:

from concurrent.futures import ThreadPoolExecutor, as_completed

def vision_one_has(team):
    return f'Hi, {team}'

teams = ['Elpis', 'Genger', 'Matrix', 'Sorlax', 'Lapras']

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    for n in names:
        future = executor.submit(vision_one_has, n)
        print(type(future))
        futures.append(future)

    for future in as_completed(futures):
        print(future.result())

上述範例在第 11 行取得 future 實例之後,在第 13 行將其放進 futures list 中,接著在第 15 行透過 as_completed(futures) 一個一個取得已經完成執行的 future 實例,並透過 result() 取得其結果後並列印出來。

其執行結果如下:

<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
Hi, Genger
Hi, Matrix
Hi, Elpis
Hi, Lapras
Hi, Sorlax

也由於我們將列印的功能從 Thread 內搬出,所以也解決列印文字可能黏在一起的情況。

除了以 submit() 先取得 Future 實例再逐一檢查狀態並取得結果之外,也可以直接利用 map() 方法直接取得 Thread 的執行結果,例如以下範例:

from concurrent.futures import ThreadPoolExecutor, as_completed


def vision_one_has(team):
    for i in range(100000):
        pass
    return f'Hi, {team}'


teams = ['Elpis', 'Genger', 'Matrix', 'Sorlax', 'Lapras']

with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(vision_one_has, teams)

for r in results:
    print(r)

ProcessPoolExecutor

ProcessPoolExecutor 的使用方法與 ThreadPoolExecutor 一模一樣,基本上視需求選擇使用 ThreadPoolExecutorProcessPoolExecutor 即可。

不過值得注意的是 Python 3.5 之後 map() 方法多了 1 個 chunksize 參數可以使用,而該參數只對 ProcessPoolExecutor 有效,該參數可以提升 ProcessPoolExecutor 在處理大量 iterables 的執行效能。

ThreadPoolExecutor vs ProcessPoolExecutor 該怎麼選!?

Thread 會在同一個 Process 裡面做事,遇到被 I/O waiting hang-up 的時候 (例如 waiting for socket response),會由另外一個 Thread 繼續做事。當任務有很多 I/O 的動作時,就適合使用 ThreadPoolExecutor

Process 則是會開新的 Process 來處理,因此對於高 CPU 計算的工作帶來效益,不同於 socket 的狀況,這些計算並不會把自身 hang-up,而會持續不斷的計算。例如說費氏數列的計算就是這樣。對於這種狀況,使用 Process 就如同開影分身,能夠讓整體的計算更快完成。這時候就適合使用 ProcessPoolExecutor

Reference

https://docs.python.org/3/library/concurrent.futures.html
https://myapollo.com.tw/zh-tw/python-concurrent-futures/
https://iter01.com/52341.html
https://blog.louie.lu/2017/08/01/%E4%BD%A0%E6%89%80%E4%B8%8D%E7%9F%A5%E9%81%93%E7%9A%84-python-%E6%A8%99%E6%BA%96%E5%87%BD%E5%BC%8F%E5%BA%AB%E7%94%A8%E6%B3%95-06-concurrent-futures/
https://www.oulub.com/zh-TW/Python/library.concurrent.futures
https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python


#Python







Related Posts

淺談 Markdown 格式

淺談 Markdown 格式

JS30 Day 11 筆記

JS30 Day 11 筆記

How to solve the perpetual loading issue in Evernote? Evernote 一直轉圈圈的解決辦法

How to solve the perpetual loading issue in Evernote? Evernote 一直轉圈圈的解決辦法


Comments